package com.sweetdream.cep

import java.util

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._

//定义温度信息pojo
case class DeviceDetail(sensorMac: String, deviceMac: String, temperature: String, dampness: String, pressure: String, date: String)

//报警的设备信息样例类
//传感器设备mac地址，检测机器mac地址，温度
case class AlarmDevice(sensorMac: String, deviceMac: String, temperature: String)

/**
 * 基于FlinkCEP的设备温度检测
 */
object checkTemperature {

  private val format: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 接受数据
    val resource = getClass.getResource("/temp")
    val dataStream = env.readTextFile(resource.getPath)

    val deviceDS: KeyedStream[DeviceDetail, String] = dataStream.map(x => {
      val stringArr: Array[String] = x.split(",")
      DeviceDetail(stringArr(0), stringArr(1), stringArr(2), stringArr(3), stringArr(4), stringArr(5))
    })
      .assignAscendingTimestamps(x => {
        format.parse(x.date).getTime
      })
      .keyBy(x => x.sensorMac)
    //todo:定义Pattern,指定相关条件和模型序列
    val pattern: Pattern[DeviceDetail, DeviceDetail] = Pattern.begin[DeviceDetail]("start")
      .where(x => x.temperature.toInt >= 40)
      .followedByAny("second")
      .where(x => x.temperature.toInt >= 40)
      .followedByAny("third")
      .where(x => x.temperature.toInt >= 40)
      .within(Time.minutes(3))

    //todo:模式检测，将模式应用到流中
    val patternResult: PatternStream[DeviceDetail] = CEP.pattern(deviceDS, pattern)


    //todo:选取结果
    patternResult.select(new MyPatternResultFunction).print()

    //todo: 启动
    env.execute("startTempeature")

  }
}

//自定义PatternSelectFunction
class MyPatternResultFunction extends PatternSelectFunction[DeviceDetail, AlarmDevice] {
  override def select(map: util.Map[String, util.List[DeviceDetail]]): AlarmDevice = {
    val startDetails: util.List[DeviceDetail] = map.get("start")
    val startResult: DeviceDetail = startDetails.iterator().next()
    println("第一条数据： " + startResult)
    val secondDetails: util.List[DeviceDetail] = map.get("start")
    val secondResult: DeviceDetail = secondDetails.iterator().next()
    println("第二条数据： " + secondResult)
    val thirdDetails: util.List[DeviceDetail] = map.get("start")
    val thirdResult: DeviceDetail = thirdDetails.iterator().next()
    println("第三条数据： " + thirdResult)
    AlarmDevice(thirdResult.sensorMac, thirdResult.deviceMac, thirdResult.temperature)
  }
}